1 /*
2 * Copyright (C) 2011 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5 * in compliance with the License. You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software distributed under the License
10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11 * or implied. See the License for the specific language governing permissions and limitations under
12 * the License.
13 */
14
15 package com.google.common.collect;
16
17 import com.google.common.annotations.Beta;
18 import com.google.common.base.Preconditions;
19
20 import java.util.ArrayDeque;
21 import java.util.Collection;
22 import java.util.Deque;
23 import java.util.PriorityQueue;
24 import java.util.Queue;
25 import java.util.concurrent.ArrayBlockingQueue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.LinkedBlockingDeque;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.PriorityBlockingQueue;
31 import java.util.concurrent.SynchronousQueue;
32 import java.util.concurrent.TimeUnit;
33
34 /**
35 * Static utility methods pertaining to {@link Queue} and {@link Deque} instances.
36 * Also see this class's counterparts {@link Lists}, {@link Sets}, and {@link Maps}.
37 *
38 * @author Kurt Alfred Kluever
39 * @since 11.0
40 */
41 public final class Queues {
42 private Queues() {}
43
44 // ArrayBlockingQueue
45
46 /**
47 * Creates an empty {@code ArrayBlockingQueue} with the given (fixed) capacity
48 * and nonfair access policy.
49 */
50 public static <E> ArrayBlockingQueue<E> newArrayBlockingQueue(int capacity) {
51 return new ArrayBlockingQueue<E>(capacity);
52 }
53
54 // ArrayDeque
55
56 /**
57 * Creates an empty {@code ArrayDeque}.
58 *
59 * @since 12.0
60 */
61 public static <E> ArrayDeque<E> newArrayDeque() {
62 return new ArrayDeque<E>();
63 }
64
65 /**
66 * Creates an {@code ArrayDeque} containing the elements of the specified iterable,
67 * in the order they are returned by the iterable's iterator.
68 *
69 * @since 12.0
70 */
71 public static <E> ArrayDeque<E> newArrayDeque(Iterable<? extends E> elements) {
72 if (elements instanceof Collection) {
73 return new ArrayDeque<E>(Collections2.cast(elements));
74 }
75 ArrayDeque<E> deque = new ArrayDeque<E>();
76 Iterables.addAll(deque, elements);
77 return deque;
78 }
79
80 // ConcurrentLinkedQueue
81
82 /**
83 * Creates an empty {@code ConcurrentLinkedQueue}.
84 */
85 public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue() {
86 return new ConcurrentLinkedQueue<E>();
87 }
88
89 /**
90 * Creates a {@code ConcurrentLinkedQueue} containing the elements of the specified iterable,
91 * in the order they are returned by the iterable's iterator.
92 */
93 public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue(
94 Iterable<? extends E> elements) {
95 if (elements instanceof Collection) {
96 return new ConcurrentLinkedQueue<E>(Collections2.cast(elements));
97 }
98 ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<E>();
99 Iterables.addAll(queue, elements);
100 return queue;
101 }
102
103 // LinkedBlockingDeque
104
105 /**
106 * Creates an empty {@code LinkedBlockingDeque} with a capacity of {@link Integer#MAX_VALUE}.
107 *
108 * @since 12.0
109 */
110 public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque() {
111 return new LinkedBlockingDeque<E>();
112 }
113
114 /**
115 * Creates an empty {@code LinkedBlockingDeque} with the given (fixed) capacity.
116 *
117 * @throws IllegalArgumentException if {@code capacity} is less than 1
118 * @since 12.0
119 */
120 public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque(int capacity) {
121 return new LinkedBlockingDeque<E>(capacity);
122 }
123
124 /**
125 * Creates a {@code LinkedBlockingDeque} with a capacity of {@link Integer#MAX_VALUE},
126 * containing the elements of the specified iterable,
127 * in the order they are returned by the iterable's iterator.
128 *
129 * @since 12.0
130 */
131 public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque(Iterable<? extends E> elements) {
132 if (elements instanceof Collection) {
133 return new LinkedBlockingDeque<E>(Collections2.cast(elements));
134 }
135 LinkedBlockingDeque<E> deque = new LinkedBlockingDeque<E>();
136 Iterables.addAll(deque, elements);
137 return deque;
138 }
139
140 // LinkedBlockingQueue
141
142 /**
143 * Creates an empty {@code LinkedBlockingQueue} with a capacity of {@link Integer#MAX_VALUE}.
144 */
145 public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue() {
146 return new LinkedBlockingQueue<E>();
147 }
148
149 /**
150 * Creates an empty {@code LinkedBlockingQueue} with the given (fixed) capacity.
151 *
152 * @throws IllegalArgumentException if {@code capacity} is less than 1
153 */
154 public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(int capacity) {
155 return new LinkedBlockingQueue<E>(capacity);
156 }
157
158 /**
159 * Creates a {@code LinkedBlockingQueue} with a capacity of {@link Integer#MAX_VALUE},
160 * containing the elements of the specified iterable,
161 * in the order they are returned by the iterable's iterator.
162 *
163 * @param elements the elements that the queue should contain, in order
164 * @return a new {@code LinkedBlockingQueue} containing those elements
165 */
166 public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(Iterable<? extends E> elements) {
167 if (elements instanceof Collection) {
168 return new LinkedBlockingQueue<E>(Collections2.cast(elements));
169 }
170 LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<E>();
171 Iterables.addAll(queue, elements);
172 return queue;
173 }
174
175 // LinkedList: see {@link com.google.common.collect.Lists}
176
177 // PriorityBlockingQueue
178
179 /**
180 * Creates an empty {@code PriorityBlockingQueue} with the ordering given by its
181 * elements' natural ordering.
182 *
183 * @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
184 */
185 public static <E extends Comparable> PriorityBlockingQueue<E> newPriorityBlockingQueue() {
186 return new PriorityBlockingQueue<E>();
187 }
188
189 /**
190 * Creates a {@code PriorityBlockingQueue} containing the given elements.
191 *
192 * <b>Note:</b> If the specified iterable is a {@code SortedSet} or a {@code PriorityQueue},
193 * this priority queue will be ordered according to the same ordering.
194 *
195 * @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
196 */
197 public static <E extends Comparable> PriorityBlockingQueue<E> newPriorityBlockingQueue(
198 Iterable<? extends E> elements) {
199 if (elements instanceof Collection) {
200 return new PriorityBlockingQueue<E>(Collections2.cast(elements));
201 }
202 PriorityBlockingQueue<E> queue = new PriorityBlockingQueue<E>();
203 Iterables.addAll(queue, elements);
204 return queue;
205 }
206
207 // PriorityQueue
208
209 /**
210 * Creates an empty {@code PriorityQueue} with the ordering given by its
211 * elements' natural ordering.
212 *
213 * @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
214 */
215 public static <E extends Comparable> PriorityQueue<E> newPriorityQueue() {
216 return new PriorityQueue<E>();
217 }
218
219 /**
220 * Creates a {@code PriorityQueue} containing the given elements.
221 *
222 * <b>Note:</b> If the specified iterable is a {@code SortedSet} or a {@code PriorityQueue},
223 * this priority queue will be ordered according to the same ordering.
224 *
225 * @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
226 */
227 public static <E extends Comparable> PriorityQueue<E> newPriorityQueue(
228 Iterable<? extends E> elements) {
229 if (elements instanceof Collection) {
230 return new PriorityQueue<E>(Collections2.cast(elements));
231 }
232 PriorityQueue<E> queue = new PriorityQueue<E>();
233 Iterables.addAll(queue, elements);
234 return queue;
235 }
236
237 // SynchronousQueue
238
239 /**
240 * Creates an empty {@code SynchronousQueue} with nonfair access policy.
241 */
242 public static <E> SynchronousQueue<E> newSynchronousQueue() {
243 return new SynchronousQueue<E>();
244 }
245
246 /**
247 * Drains the queue as {@link BlockingQueue#drainTo(Collection, int)}, but if the requested
248 * {@code numElements} elements are not available, it will wait for them up to the specified
249 * timeout.
250 *
251 * @param q the blocking queue to be drained
252 * @param buffer where to add the transferred elements
253 * @param numElements the number of elements to be waited for
254 * @param timeout how long to wait before giving up, in units of {@code unit}
255 * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
256 * @return the number of elements transferred
257 * @throws InterruptedException if interrupted while waiting
258 */
259 @Beta
260 public static <E> int drain(BlockingQueue<E> q, Collection<? super E> buffer, int numElements,
261 long timeout, TimeUnit unit) throws InterruptedException {
262 Preconditions.checkNotNull(buffer);
263 /*
264 * This code performs one System.nanoTime() more than necessary, and in return, the time to
265 * execute Queue#drainTo is not added *on top* of waiting for the timeout (which could make
266 * the timeout arbitrarily inaccurate, given a queue that is slow to drain).
267 */
268 long deadline = System.nanoTime() + unit.toNanos(timeout);
269 int added = 0;
270 while (added < numElements) {
271 // we could rely solely on #poll, but #drainTo might be more efficient when there are multiple
272 // elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
273 added += q.drainTo(buffer, numElements - added);
274 if (added < numElements) { // not enough elements immediately available; will have to poll
275 E e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
276 if (e == null) {
277 break; // we already waited enough, and there are no more elements in sight
278 }
279 buffer.add(e);
280 added++;
281 }
282 }
283 return added;
284 }
285
286 /**
287 * Drains the queue as {@linkplain #drain(BlockingQueue, Collection, int, long, TimeUnit)},
288 * but with a different behavior in case it is interrupted while waiting. In that case, the
289 * operation will continue as usual, and in the end the thread's interruption status will be set
290 * (no {@code InterruptedException} is thrown).
291 *
292 * @param q the blocking queue to be drained
293 * @param buffer where to add the transferred elements
294 * @param numElements the number of elements to be waited for
295 * @param timeout how long to wait before giving up, in units of {@code unit}
296 * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
297 * @return the number of elements transferred
298 */
299 @Beta
300 public static <E> int drainUninterruptibly(BlockingQueue<E> q, Collection<? super E> buffer,
301 int numElements, long timeout, TimeUnit unit) {
302 Preconditions.checkNotNull(buffer);
303 long deadline = System.nanoTime() + unit.toNanos(timeout);
304 int added = 0;
305 boolean interrupted = false;
306 try {
307 while (added < numElements) {
308 // we could rely solely on #poll, but #drainTo might be more efficient when there are
309 // multiple elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
310 added += q.drainTo(buffer, numElements - added);
311 if (added < numElements) { // not enough elements immediately available; will have to poll
312 E e; // written exactly once, by a successful (uninterrupted) invocation of #poll
313 while (true) {
314 try {
315 e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
316 break;
317 } catch (InterruptedException ex) {
318 interrupted = true; // note interruption and retry
319 }
320 }
321 if (e == null) {
322 break; // we already waited enough, and there are no more elements in sight
323 }
324 buffer.add(e);
325 added++;
326 }
327 }
328 } finally {
329 if (interrupted) {
330 Thread.currentThread().interrupt();
331 }
332 }
333 return added;
334 }
335
336 /**
337 * Returns a synchronized (thread-safe) queue backed by the specified queue. In order to
338 * guarantee serial access, it is critical that <b>all</b> access to the backing queue is
339 * accomplished through the returned queue.
340 *
341 * <p>It is imperative that the user manually synchronize on the returned queue when accessing
342 * the queue's iterator: <pre> {@code
343 *
344 * Queue<E> queue = Queues.synchronizedQueue(MinMaxPriorityQueue.<E>create());
345 * ...
346 * queue.add(element); // Needn't be in synchronized block
347 * ...
348 * synchronized (queue) { // Must synchronize on queue!
349 * Iterator<E> i = queue.iterator(); // Must be in synchronized block
350 * while (i.hasNext()) {
351 * foo(i.next());
352 * }
353 * }}</pre>
354 *
355 * <p>Failure to follow this advice may result in non-deterministic behavior.
356 *
357 * <p>The returned queue will be serializable if the specified queue is serializable.
358 *
359 * @param queue the queue to be wrapped in a synchronized view
360 * @return a synchronized view of the specified queue
361 * @since 14.0
362 */
363 public static <E> Queue<E> synchronizedQueue(Queue<E> queue) {
364 return Synchronized.queue(queue, null);
365 }
366
367 /**
368 * Returns a synchronized (thread-safe) deque backed by the specified deque. In order to
369 * guarantee serial access, it is critical that <b>all</b> access to the backing deque is
370 * accomplished through the returned deque.
371 *
372 * <p>It is imperative that the user manually synchronize on the returned deque when accessing
373 * any of the deque's iterators: <pre> {@code
374 *
375 * Deque<E> deque = Queues.synchronizedDeque(Queues.<E>newArrayDeque());
376 * ...
377 * deque.add(element); // Needn't be in synchronized block
378 * ...
379 * synchronized (deque) { // Must synchronize on deque!
380 * Iterator<E> i = deque.iterator(); // Must be in synchronized block
381 * while (i.hasNext()) {
382 * foo(i.next());
383 * }
384 * }}</pre>
385 *
386 * <p>Failure to follow this advice may result in non-deterministic behavior.
387 *
388 * <p>The returned deque will be serializable if the specified deque is serializable.
389 *
390 * @param deque the deque to be wrapped in a synchronized view
391 * @return a synchronized view of the specified deque
392 * @since 15.0
393 */
394 public static <E> Deque<E> synchronizedDeque(Deque<E> deque) {
395 return Synchronized.deque(deque, null);
396 }
397 }